home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from __future__ import with_statement
- from util.introspect import use_profiler
- __all__ = [
- 'makeRequests',
- 'NoResultsPending',
- 'NoWorkersAvailable',
- 'ThreadPool',
- 'WorkRequest',
- 'WorkerThread']
- __author__ = 'Christopher Arndt'
- __version__ = '1.2.3'
- __revision__ = '$Revision: 1.5 $'
- __date__ = '$Date: 2006/06/23 12:32:25 $'
- __license__ = 'Python license'
- import sys
- import threading
- import Queue
- from util.introspect import callany
- from bgthread import BackgroundThread
-
- def requesthash(request):
- return (request, getattr(request, 'im_self', None))
-
-
- class NoResultsPending(Exception):
- pass
-
-
- class NoWorkersAvailable(Exception):
- pass
-
-
- class WorkerThread(BackgroundThread):
-
- def __init__(self, threadPool, **kwds):
- if 'name' not in kwds:
- kwds['name'] = threading._newname('Wkr%d')
-
- BackgroundThread.__init__(self, **kwds)
- self.setDaemon(1)
- self.workRequestQueue = threadPool.requestsQueue
- self.resultQueue = threadPool.resultsQueue
- self.runningNow = threadPool.runningNow
- self.runningNowLock = threadPool.runningNowLock
- self._dismissed = threading.Event()
- self.start()
-
-
- def run(self):
- self.BeforeRun()
- use_profiler(self, self._run)
- self.AfterRun()
-
-
- def _run(self):
- while not self._dismissed.isSet():
- setattr(self, 'loopcount', getattr(self, 'loopcount', 0) + 1)
- request = self.workRequestQueue.get()
- if self._dismissed.isSet():
- self.workRequestQueue.put(request)
- break
-
-
- try:
- result = request.callable(*request.args, **request.kwds)
- except Exception:
- e = None
- if request.verbose:
- import traceback
- import sys
- print >>sys.stderr, 'threadpool: this error is being passed to exception handler (or being ignored):\n'
- traceback.print_exc()
-
- request.exception = True
- request.exception_instance = e
- result = None
-
- if request.exception and request.exc_callback:
- callany(request.exc_callback, request.exception_instance)
-
- if request.callback:
- if request.exception:
- pass
- if not (request.exc_callback):
- callany(request.callback, result)
-
- self.runningNowLock.__enter__()
-
- try:
- self.runningNow.discard(requesthash(request.callable))
- finally:
- pass
-
- continue
- self.runningNowLock
-
-
- def dismiss(self):
- self._dismissed.set()
-
-
-
- class WorkRequest(object):
-
- def __init__(self, callable, args = None, kwds = None, requestID = None, callback = None, exc_callback = None):
- if requestID is None:
- self.requestID = id(self)
- else:
-
- try:
- hash(requestID)
- except TypeError:
- raise TypeError('requestID must be hashable.')
-
- self.requestID = requestID
- self.exception = False
- self.callback = callback
- self.exc_callback = exc_callback
- self.callable = callable
- if not args:
- pass
- self.args = []
- if not kwds:
- pass
- self.kwds = { }
-
-
-
- class ThreadPool(object):
- requestsQueue = Queue.Queue()
- resultsQueue = Queue.Queue()
- runningNow = set()
- runningNowLock = threading.RLock()
- workers = []
-
- def __init__(self, num_workers = 0, q_size = 0):
- self.requestsQueue.maxsize = q_size
- self.createWorkers(num_workers)
-
-
- def createWorkers(self, num_workers):
- for i in range(num_workers):
- self.workers.append(WorkerThread(self))
-
-
-
- def joinAll(self):
- for worker in self.workers:
- worker.dismiss()
-
- threaded = threaded
- import threadpool2
- threaded((lambda : pass))()
- for worker in self.workers:
- worker.join()
-
-
-
- def dismissWorkers(self, num_workers):
- for i in range(min(num_workers, len(self.workers))):
- worker = self.workers.pop()
- worker.dismiss()
-
-
-
- def __contains__(self, request_method):
- h = requesthash(request_method)
- self.runningNowLock.__enter__()
-
- try:
- return h in self.runningNow
- finally:
- pass
-
-
-
- def putRequest(self, request, block = True, timeout = 0):
- self.runningNowLock.__enter__()
-
- try:
- self.runningNow.add(requesthash(request.callable))
- finally:
- pass
-
- self.requestsQueue.put(request, block, timeout)
-
-
- def wait(self):
- while None:
-
- try:
- self.poll(True)
- continue
- except NoResultsPending:
- break
- continue
-
-
- return None
-
-
-
- def makeRequests(callable, args_list, callback = None, exc_callback = None):
- requests = []
- for item in args_list:
- if isinstance(item, tuple):
- requests.append(WorkRequest(callable, item[0], item[1], callback = callback, exc_callback = exc_callback))
- continue
- requests.append(WorkRequest(callable, [
- item], None, callback = callback, exc_callback = exc_callback))
-
- return requests
-
- if __name__ == '__main__':
- import random
- import time
-
- def do_something(data):
- time.sleep(random.randint(1, 5))
- result = round(random.random() * data, 5)
- if result > 3:
- raise RuntimeError('Something extraordinary happened!')
-
- return result
-
-
- def print_result(request, result):
- print '**Result: %s from request #%s' % (result, request.requestID)
-
-
- def handle_exception(request, exc_info):
- print 'Exception occured in request #%s: %s' % (request.requestID, exc_info[1])
-
- data = [ random.randint(1, 10) for i in range(20) ]
- requests = makeRequests(do_something, data, print_result, handle_exception)
- data = [ ((random.randint(1, 10),), { }) for i in range(20) ]
- requests.extend(makeRequests(do_something, data, print_result, handle_exception))
- main = ThreadPool(3)
- for req in requests:
- main.putRequest(req)
- print 'Work request #%s added.' % req.requestID
-
- i = 0
- while None:
-
- try:
- main.poll()
- print 'Main thread working...'
- time.sleep(0.5)
- if i == 10:
- print 'Adding 3 more worker threads...'
- main.createWorkers(3)
-
- i += 1
- continue
- except KeyboardInterrupt:
- []
- []
- []
- print 'Interrupted!'
- break
- continue
- except NoResultsPending:
- print 'All results collected.'
- break
- continue
-
- except:
- None<EXCEPTION MATCH>NoResultsPending
- return None
-
-